package com.uziot.bucket.common.countdownlatch;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

/**
 * @author shidt
 * @version V1.0
 * @className CountDownLatchTaskService
 * @date 2021-11-22 21:42:46
 * @description 批处理任务异步处理服务
 * 将任务并发执行，并控制并发量为执行许可数量
 * 使用减数计数器和信号量实现并发控制
 */

@Slf4j
public abstract class CountDownLatchTaskService {
    /**
     * 线程池
     */
    private final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();

    /**
     * 处理过程
     *
     * @param taskList 任务了列表
     * @param permits  允许的并发线程数
     */
    public void process(List<Object> taskList, int permits) {
        // 1.声明一个信号量允许的并发线程数量
        final Semaphore semaphore = new Semaphore(permits);
        try {
            if (null != taskList && !taskList.isEmpty()) {
                // 2.声明计数器总数为任务列表的大小
                final CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
                // 3.批次任务遍历处理执行任务
                for (final Object task : taskList) {
                    // 4.获取一个许可证后才允许执行否则主线程等待
                    semaphore.acquireUninterruptibly();

                    // 5.线程池提交任务
                    threadPoolTaskExecutor.submit(new Callable<Object>() {
                        @Override
                        public Object call() throws Exception {
                            try {
                                // 6.异步执行任务实现
                                return handlerTask(task);
                            } finally {
                                log.info("countDownLatch={}", countDownLatch.getCount());
                                // 计数器减数
                                countDownLatch.countDown();
                                // 释放许可
                                semaphore.release();
                            }
                        }
                    });
                }
                // 6.主线程等待所有任务完成后放行
                countDownLatch.await();
            } else {
                if (log.isInfoEnabled()) {
                    log.info("没有获取到批次信息！");
                }
            }
            log.info("所有批次任务执行完成！");
        } catch (Exception e) {
            log.error("处理批次信息发生异常，具体信息为：{}", e.getMessage(), e);
        }
    }

    /**
     * 异步处理任务的子方法具体执行任务方法
     *
     * @param task 任务
     * @return Object o
     */
    public abstract Object handlerTask(Object task);

}
